分类
联系方式
  1. 新浪微博
  2. E-mail

DartVM 异步实现原理

介绍

Dart 是一门单线程语言,基于消息队列实现异步。在日常 Dart 编程中,经常会写 Future 和 Timer。这种异步机制底层是怎么实现的呢?在本文中进行讲解。

消息队列数据结构

消息队列是存放消息的队列。如下图所示:

其中:

  • 通过 Enqueue() 可以向队列尾部添加消息
  • 通过 Dequeue() 可以从队列头部取出消息

注:Enqueue() 通过设置参数 before_event,可以向队列头部插入消息(插队)

Message

在 DartVM 有,专门有一个 Message 类,表示消息对列上的消息。位于 runtime/vm/message.h。

消息就像一封信,包含几个要素:

  1. 内容:通过消息传递的数据,payload_
  2. 地址:消息送到哪里去,dest_port
  3. 类型:消息分类两类,普通消息和控制消息(OOB)
  4. 优先级:先处理高优

跟生活中寄出一封信,或者寄一个快递是类似的。

Message 实例:Future 异步

Message 的实际使用场景是什么?Future 就是基于 MessageQueue 和 Message 来实现的。

在很多教程文章,都会提到,当执行 Future 时,会将任务插入到队尾,当消息队列执行到该任务时,Future 中的代码才会真正执行,因此实现异步。

Future 往消息队列上插的是什么?对应的 Message 内容是什么?

这封信的要素:

  1. 内容:1
  2. 地址:定时器专用消息处理器地址,Future 和定时器共用一套处理逻辑
  3. 类型:普通消息
  4. 优先级:普通优先级

每个消息都有一个目标地址,目标地址关联着一个消息处理器。Future 和定时器共用一套处理逻辑,对应着一个专门的消息处理器。

消息的内容是个数字,就是 1,这个值定义在 sdk/lib/_internal/vm/lib/timer_impl.dart 中:static const _ZERO_EVENT = 1;

这个 1 有什么特殊意义吗?是个枚举值,用来在处理逻辑中做类型判断用。

Future 异步处理先介绍到这里,补充点基础知识之后再回来看。

Dart Port 机制

Port 端口

Message 的地址长什么样?这套机制由 Dart 虚拟机底层的 Port 机制来提供。

Port 机制说简单也简单,它就是一个整数,在虚拟机中有一个专门的类型定义:

typedef int64_t Dart_Port;

每个数字就是一个 Port,在虚拟机中专门有一个方法用来分配端口号(PortMap::AllocatePort)。

MessageHandler

通过端口(门牌号),消息有了传送的目的地。但是送到目的地,谁负责接收呢?答案是 MessasgeHandler。

MessasgeHandler 顾名思义,消息处理器,专门用来处理消息。它位于 runtime/vm/message_handler.h,方法签名如下:

MessageHandler::MessageStatus MessageHanler::HandleMessage(
    MonitorLocker* ml,
    bool allow_normal_message,
    bool allow_multiple_normal_messages);

在该方法的实现里,MessasgeHandler 会从消息队列中循环取出消息执行。

PortMap

Port 端口和 MessageHandler 之间是什么关系?是映射关系。

有一个专门的类,负责维护 Port 与 MessageHandler 的映射数据结构,它就是 PortMap。它位于 runtime/vm/port.h。

需要注意:在 Dart 虚拟机中有两个 PortMap,一个位于 C/C++ 层,一个位于 Dart 层,两者之间也有联系。这里我们先说 C/C++ 层的,Dart 层的后文再说。

PortMap 中 Port 与 MessageHandler 的映射关系如下图:

其中:

  • 每个端口号都有一个 MessageHandler 与之关联,同时关联的还有一个端口状态 PortState
  • PortSet 是虚拟机自己实现的一个集合类容器,Entry 是其中的元素,每个元素包含端口号-消息处理器-端口状态
  • PortMap 对外提供一个 PortMap::CreatePort(MessageHandler* handler) 方法,用于创建一个端口,并与传入的 handler 绑定,并存入 PortSet 中

端口机制小结

对于 Dart 的端口机制,先小结一下:

  1. Port 端口,是一个整数,是 message 的目的地
  2. 每个端口都有一个唯一的消息处理器 MessageHandler 与之对应
  3. Port 和 MessageHandler 的映射关系,在 PortMap 中进行维护
  4. PortMap 对外提供一个 PortMap::CreatePort(MessageHandler* handler) 方法,用于创建一个端口,并与传入的 handler 绑定,并存入 PortSet 中

Isolate 中的消息队列

前面介绍了消息队列数据结构,消息队列在 Dart 中是如何运用的?

在 Dart 虚拟机里,代码运行在 Isolate 中,消息队列在 Isolate 里。

Isolate

Flutter/Dart 代码是事件驱动的,代码运行在 Isolate 当中。对于 Dart 开发来说,Isolate 相当于线程,基于 Isolate 可以实现无锁并发。

Isolate 的组成结构如下图所示:

其中:

  • Isolate 包含 MessageHandler,MessageHandler 包含消息队列(实际有2条)
  • Isolate 运行在一条线程上,称为 Mutator Thread

Isolate 创建成功后,会启动消息队列,进行消息循环处理。

(注:关于 Isolate 是如何执行 Dart 代码的,会在另一篇文章中进行说明,本文只围绕消息处理主题。)

MessageHandler

MessageHandler 是 Dart 虚拟机中的一个底层类,前文介绍了 MessageHandler 如何处理消息,这里给出 MessageHandler 的完整功能:

  • 维护两条消息队列:
    • quque_ 是消息队列:传递普通消息
    • oob_queue_ 是控制消息队列:传递控制消息
    • MessageHandler 负责创建消息队列
  • 将消息队列放在线程池中执行
  • 通过 MessageHandler::PostMessage 向消息队列发送消息
  • 维护一个 PortSet,与 PortMap 有映射关系

重点说一下 MessageHandler 将消息队列放在线程池中执行:消息队列只是一个数据结构,需要有一个循环逻辑来消费这个队列。这部分逻辑由 MessageHandler::Run 方法提供。

MessageHandler 的组成结构如下图所示:

IsolateMessageHandler

MessageHandler 作为 Isolate 的成员,在 Isolate 创建的时候(Isolate::InitIsolate),Isolate 会通过 PortMap 创建一个新端口,并传入自己的 MessageHandler 与该端口关联。这个端口就是 Isolate 的主端口(main_port)。

MessageHandler 是一个抽象类,Isolate 的 MessageHandler 具体类型是 IsolateMessageHandler,从名称可以看出,该类专门负责 Isolate 的消息处理。

ReceivePort 机制

Port 机制很好理解,一个端口号绑着一个消息处理器,向端口号发送消息,消息处理器就能收到回调。

然而实际的情况要更加复杂。前面的讲解都是基于虚拟机 C/C++ 层的底层实现,而现实中开发者编写的都是 Dart 代码,开发者使用消息回调也都是 Dart 实现,这部分 Dart 层到 C/C++ 层是需要衔接的。为此 Dart 虚拟机提供了 ReceivePort 机制,这是一层对 Port 机制的封装。Dart 层的 Future、Timer 定时器都基于该 ReceivePort 机制。

ReceivePort 的完整类继承关系如下(覆盖 C/C++ 和 Dart 层):

直观理解

关于这些类的细节解读,可以参见我博客中相关文章。这里以一种顶层的宏观理解来概括。

Dart 层的 ReceivePort

在学习 Dart Isolate 间通信的时候,我们知道需要先创建 ReceivePort。ReceivePort 中有一个 SendPort,把 SendPort 传给新 Isolate,新 Isolate 拿到 SendPort,就能向 ReceivePort 回传数据了。实例代码如下:

void demo() {
    ReceivePort port = ReceivePort();
    // 创建 isolate
    isolate.spawn(func, port.sendPort);
    
    port.listen((message) {
        print(message)
    });
}

void func(SendPort sendPort) {
    sendPort.send(1)
}

ReceivePort 与 Port 机制

Dart 侧的 ReceivePort 是 C/C++ 底层 Port 机制的一个具体实现。

当创建 ReceivePort 实例的时候,会在底层创建一个新的端口,通过前面的章节我们知道,新端口要与 MessageHandler 相绑定。

对于 ReceivePort 来说,listen 方法起到的是 MessageHandler 的作用。那底层 C/C++ 的 PortMap 中,新端口对应的 MessageHandler,是这个 Stream 吗?

答案并不是。因为这里涉及到 Dart 到 C/C++ 的跨层,Dart 虚拟机采用了一种间接转发机制。

Dart MessageHandler 转发

当创建 ReceivePort 实例的时候,会在底层创建一个新的端口,这个端口对应的 MessageHandler 绑定的是 Isolate 的 MessageHandler。

也就是说,用 ReceivePort 的 SendPort 发送消息,首先接收到消息的是其 Isolate 的 MessageHandler。

Isolate 的 MessageHandler

Isolate 的 MessageHandler,类型为 IsolateMessageHandler,位于 sdk/lib/_internal/vm/lib/isolate_patch.dart。

解析消息的方法是 IsolateMessageHandler::HandleMessage。

IsolateMessageHandler 会处理多种消息类型:

  • OOB 消息:全称是 out of band,是用于控制 Isolate 的一些控制消息
  • 普通消息

Dart 侧的 ReceivePort 属于普通消息,IsolateMessageHandler::HandleMessage 解析到该消息时,会去获取 ReceivePort 的 MessageHandler。

ReceivePort 实现了 Stream 接口,它的 MessageHandler 实际上是 StreamController.add 方法(参见 sdk/lib/_internal/vm/lib/isolate_patch.dart 的 _ReceivePortImpl.fromRawReceivePort 命名构造方法)。

Dart 侧还有一个 PortMap

IsolateMessageHandler::HandleMessage 是怎么找到 ReceivePort 的 StreamController.add 呢?

前面讲的 PortMap 是一个 C++ 类,将端口与 MessageHandler 相关联,C++ 的 PortMap 关联的 MessageHandler 都是 C++ 的。ReceivePort 的 StreamController.add 是 Dart 的。

答案是 Dart 层还有一个 PortMap,存的是端口号到 Dart MessageHandler 的映射关系。Dart 层 PortMap 的类型签名是:

static final _portMap = <int, Map<String, dynamic>>{};

其中:

  • key 是个 int,比较熟悉吧,他就是 Dart_Port 端口号
  • value 是个嵌套 map,长这样:{ 'port': dart_handler_func }
  • dart_handler_func 就是 ReceivePort 的 StreamController.add

回到 Isolate 的 MessageHandler,它拿到一条普通消息,然后查询 Dart 侧的 PortMap,找到 ReceivePort 的 处理回调(StreamController.add)执行

Isolate ReceivePort 通信机制

至此,我们梳理完成了 Dart 虚拟机 Isolate ReceivePort 通信机制。由于涉及到 Dart/C++ 跨层,中间通过 IsolateMessageHandler::HandleMessage 转发了一次,显得有些绕。

整体流程如下:

Isolate 无锁并发

Isolate 相当于传统意义上的线程,通过 ReceivePort 机制,可以实现无锁并发。通过上面分析,可以知道实现原因。

Isolate 间通信的时候,通过 ReceivePort/SendPort 机制相互首发信息,消息(Message)实际上是发送到对方的消息队列中,因此不存在一个变量同时被多个线程访问的情况。

Timer

Future 和定时器都是日常开发中常用的功能,Future 又是基于定时器实现的,因此首先分析定时器的实现原理。

关系图

定时器也是横跨 Dart 和 C++ 两界,因此涉及到的类也比较多,如下图所示: